Skip to content

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Oct 8, 2025

Summary

Initial version of the Processing service V2.

Closes #971
Closes #968
Closes #969

Current state

The V2 path is working but disabled by default in this PR to allow for extended testing. When enabled, starting a job creates a queue for that job and populates with one task per image. The tasks can be pulled and ACKed via the APIs introduced in PR #1046. The new path can be enabled for a project via the async_pipeline_workers feature flag.

List of Changes

  • Added NATS JetStream to the docker compose. I also tried RabbitMQ and Beanstalkd, but they don't support the visibility timeout semantics we want or a disconnected mode of pulling and ACKing tasks.
  • Added TaskStateManager and TaskQueueManager
  • Added the queuing and async results processing logic

TODOs:

Related Issues

See issues #970 and #971.

How to Test the Changes

This path can be enabled by turning on the job.project.feature_flags.async_pipeline_workers feature flag, see ami/jobs/models.py:400:

        if job.project.feature_flags.async_pipeline_workers:
            cls.queue_images_to_nats(job, images)
        else:
            cls.process_images(job, images)

And running the ami worker from RolnickLab/ami-data-companion#94

Test

image

Test both modes by tweaking the flag in the django admin console:
image

Deployment Notes

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • New Features

    • Optional asynchronous pipeline processing via NATS: queueing/reserving, acknowledgements, atomic progress tracking, and a feature flag to enable async workers.
  • Chores

    • Added NATS service and configuration (local/CI/staging), environment variable and default setting, and client dependency.
  • Documentation

    • README entry documenting the NATS dashboard URL and localhost note.
  • Tests

    • New tests covering the queue manager and task-state tracking.

✏️ Tip: You can customize this high-level summary in your review settings.

@netlify
Copy link

netlify bot commented Oct 8, 2025

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 936d768
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/696a92e1e3a408000840738e

@mihow
Copy link
Collaborator

mihow commented Oct 8, 2025

Exciting!

@carlosgjs carlosgjs marked this pull request as ready for review October 24, 2025 18:49
Copilot AI review requested due to automatic review settings October 24, 2025 18:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Processing Service V2, enabling a pull-based task queue architecture using NATS JetStream instead of the push-based Celery approach. Workers can now pull tasks via HTTP endpoints, process them independently, and acknowledge completion without maintaining persistent connections.

Key changes:

  • Added NATS JetStream integration for distributed task queuing with configurable visibility timeouts
  • Introduced new REST API endpoints for task pulling (/jobs/{id}/tasks) and result submission (/jobs/{id}/result)
  • Implemented Redis-based progress tracking to handle asynchronous worker updates

Reviewed Changes

Copilot reviewed 15 out of 16 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
requirements/base.txt Added nats-py dependency for NATS client support
object_model_diagram.md Added comprehensive Mermaid diagram documenting ML pipeline system architecture
docker-compose.yml Added NATS JetStream service with health checks and monitoring
config/settings/base.py Added NATS_URL configuration setting
ami/utils/nats_queue.py New TaskQueueManager class for NATS JetStream operations
ami/jobs/views.py Added task pulling and result submission endpoints with pipeline filtering
ami/jobs/utils.py Helper function for running async code in sync Django context
ami/jobs/tasks.py New Celery task for processing pipeline results asynchronously
ami/jobs/task_state.py TaskStateManager for Redis-based job progress tracking
ami/jobs/models.py Added queue_images_to_nats method and NATS cleanup logic
ami/base/views.py Fixed request.data handling when not a dict
README.md Added NATS dashboard documentation link
.vscode/launch.json Added debug configurations for Django and Celery containers
.envs/.local/.django Added NATS_URL environment variable
.dockerignore Expanded with comprehensive ignore patterns
Comments suppressed due to low confidence (1)

object_model_diagram.md:1

  • The comment at line 13 appears to be template text from instructions rather than actual documentation content. This namedtuple field description doesn't match the file's purpose as an object model diagram.
# Object Model Diagram: ML Pipeline System

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@carlosgjs carlosgjs requested a review from mihow October 24, 2025 18:59
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 31, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

📝 Walkthrough

Walkthrough

Adds NATS JetStream-backed task queuing, a TaskQueueManager, Redis task-state tracking, optional async ML job queuing behind a feature flag, Celery result-processing with NATS ACKing, API endpoints to reserve/submit tasks, tests, Docker/CI/staging and config updates, and a new nats dependency.

Changes

Cohort / File(s) Summary
Configuration & Requirements
\.envs/.local/.django`, `.envs/.ci/.django`, `.envs/.production/.django-example``, config/settings/base.py, ``requirements/base.txt``
Add NATS_URL env entries; expose NATS_URL in settings; add nats-py==2.10.0.
Docker & CI / Staging
docker-compose.yml, docker-compose.ci.yml, docker-compose.staging.yml
Add nats JetStream service (JetStream + monitoring), healthchecks; include nats in django depends_on; minor service entrypoint and restart tweaks.
Docs
README.md
Document NATS dashboard URL and localhost note.
Feature Flags
ami/main/models.py
Add feature flag async_pipeline_workers: bool = False.
NATS Queue Manager
ami/ml/orchestration/nats_queue.py
New async TaskQueueManager and get_connection: manage JetStream connection, per-job stream/consumer lifecycle, publish/reserve/ack, and cleanup.
Orchestration Helpers
ami/ml/orchestration/jobs.py
Add queue_images_to_nats(job, images) and cleanup_nats_resources(job); prepare messages, initialize task state, publish via TaskQueueManager.
Task State Management
ami/ml/orchestration/task_state.py
New TaskStateManager and TaskProgress for Redis-backed per-job per-stage pending lists, progress calculation, TTL, locking, and cleanup.
ML Job Model
ami/jobs/models.py
MLJob.run branches on project.feature_flags.async_pipeline_workers; add MLJob.process_images classmethod and NATS queueing path.
Celery Tasks & Helpers
ami/jobs/tasks.py
Add Celery task process_pipeline_result and helpers log_time, _update_job_progress, _ack_task_via_nats; integrate TaskStateManager, DB save, NATS ACK, timing/logging, and concurrency handling.
API Endpoints & Schema Params
ami/jobs/views.py, ami/utils/requests.py
Implement tasks (GET) endpoint to reserve tasks from NATS and result (POST) to enqueue result processing via Celery; add OpenAPI params ids_only, incomplete_only, batch.
Schemas / Models
ami/ml/schemas.py
Remove queue_timestamp field from PipelineProcessingTask.
Tests
ami/ml/test_nats_queue.py, ami/ml/tests.py, ami/jobs/tests.py
Add unit tests for TaskQueueManager and TaskStateManager; update job tests to call queue_images_to_nats and expect accepted / results_queued.
Module Init / Minor Refactor
ami/ml/orchestration/__init__.py
Remove unconditional re-export from .processing; add explanatory comment.
sequenceDiagram
    participant Client
    participant MLJob
    participant Flags as FeatureFlags
    participant QueueMgr as TaskQueueManager
    participant State as TaskStateManager
    participant Worker
    participant Celery as process_pipeline_result
    participant DB as Database

    Client->>MLJob: run(job, images)
    MLJob->>Flags: check async_pipeline_workers
    alt async enabled
        MLJob->>State: initialize_job(image_ids)
        MLJob->>QueueMgr: queue_images_to_nats(job, images)
        loop per image/batch
            QueueMgr->>QueueMgr: publish_task(job_id, message)
        end
    else sync path
        MLJob->>MLJob: process_images(job, images)
        MLJob->>DB: persist results & progress
    end

    Note over Worker,QueueMgr: Worker reserves tasks from JetStream
    Worker->>QueueMgr: reserve_task(job_id, batch)
    Worker->>Celery: run pipeline, produce result + reply_subject
    Celery->>State: update_state(processed_ids, "process", request_id)
    Celery->>DB: save pipeline results
    Celery->>QueueMgr: acknowledge_task(reply_subject)
    Celery->>State: update_state(processed_ids, "results", request_id)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

backend, ml

Suggested reviewers

  • mihow

Poem

🐇 I hopped through JetStream at dawn’s bright light,
I queued each frame and watched them take flight,
Redis counted carrots as workers ran fast,
Celery nodded when ACKs came at last,
A hop, a queue — the pipeline’s delight. 🥕✨

🚥 Pre-merge checks | ✅ 3 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 73.77% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Processing service V2 - Phase 1' is generic and lacks specificity about the main technical changes, though it does reference the overall initiative. Consider a more specific title that highlights the primary technical change, such as 'Add NATS JetStream-based async task queueing for ML pipeline processing' to better convey the core implementation.
✅ Passed checks (3 passed)
Check name Status Explanation
Description check ✅ Passed The description covers all major template sections including summary, list of changes, related issues, testing instructions, and deployment notes, with a comprehensive explanation of the new async processing path.
Linked Issues check ✅ Passed The PR implements queue creation/population for PSv2 Job scheduler (#971) via NATS JetStream, supports pull/ACK workflow (#968, #969), and all required coding objectives from linked issues are met.
Out of Scope Changes check ✅ Passed All changes are focused on implementing the async processing pipeline (TaskQueueManager, TaskStateManager, NATS integration, and related infrastructure); no unrelated or extraneous modifications detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

🧹 Recent nitpick comments
ami/ml/orchestration/jobs.py (3)

28-28: Add missing return type hint.

The docstring documents that this function returns bool, but the function signature lacks a return type annotation.

Suggested fix
-def queue_images_to_nats(job: "Job", images: list[SourceImage]):
+def queue_images_to_nats(job: "Job", images: list[SourceImage]) -> bool:

45-51: Avoid calling image.url() twice.

image.url() is invoked twice on line 47 when the attribute exists. If this method involves I/O or computation (e.g., generating a signed URL), this is inefficient.

Suggested fix
     for image in images:
         image_id = str(image.pk)
-        image_url = image.url() if hasattr(image, "url") and image.url() else ""
+        image_url = image.url() if hasattr(image, "url") else ""
         if not image_url:
             job.logger.warning(f"Image {image.pk} has no URL, skipping queuing to NATS for job '{job.pk}'")
             skipped_count += 1
             continue

69-84: Consider using job.logger consistently within the async function.

Lines 72 and 78 use the module-level logger imported from ami.jobs.models, while the rest of the function (lines 39, 49, 63, 105, 108) uses job.logger. Using job.logger would route these logs through the job-specific JobLogHandler for better traceability.

Suggested fix
         async with TaskQueueManager() as manager:
             for image_pk, task in tasks:
                 try:
-                    logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
+                    job.logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
                     success = await manager.publish_task(
                         job_id=job.pk,
                         data=task,
                     )
                 except Exception as e:
-                    logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
+                    job.logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
                     success = False

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0e350dd and 936d768.

📒 Files selected for processing (1)
  • ami/ml/orchestration/jobs.py
🧰 Additional context used
🧬 Code graph analysis (1)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (3)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (35-300)
  • cleanup_job_resources (284-300)
  • publish_task (126-157)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
🪛 Ruff (0.14.11)
ami/ml/orchestration/jobs.py

77-77: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (2)
ami/ml/orchestration/jobs.py (2)

1-26: LGTM!

The cleanup_nats_resources function correctly wraps the async cleanup operation using async_to_sync and properly returns the boolean result. The docstring accurately documents the return value.


88-114: LGTM!

The result handling logic correctly:

  • Counts skipped images as failures
  • Marks job stages as FAILURE when all images are skipped (no tasks to queue)
  • Returns False if any images failed to queue (including skipped ones)
  • Defers progress updates to the async workers when tasks are successfully queued

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ef8f16c and d254867.

📒 Files selected for processing (19)
  • .dockerignore (1 hunks)
  • .envs/.local/.django (1 hunks)
  • .gitignore (1 hunks)
  • .vscode/launch.json (1 hunks)
  • README.md (1 hunks)
  • ami/base/views.py (1 hunks)
  • ami/jobs/models.py (8 hunks)
  • ami/jobs/tasks.py (2 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/main/models.py (1 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
  • ami/ml/orchestration/task_state.py (1 hunks)
  • ami/ml/orchestration/utils.py (1 hunks)
  • ami/utils/requests.py (2 hunks)
  • config/settings/base.py (2 hunks)
  • docker-compose.yml (4 hunks)
  • object_model_diagram.md (1 hunks)
  • requirements/base.txt (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
ami/ml/orchestration/nats_queue.py (1)
ami/jobs/views.py (1)
  • result (256-339)
ami/ml/orchestration/task_state.py (1)
ami/ml/orchestration/jobs.py (1)
  • cleanup (20-23)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (45-138)
ami/jobs/models.py (4)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • final_states (58-59)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • reserve_task (152-208)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (28-294)
  • acknowledge_task (210-229)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • mark_images_processed (48-61)
  • get_progress (63-90)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/jobs/models.py (5)
  • Job (727-1012)
  • JobState (27-63)
  • logger (997-1006)
  • update_stage (168-188)
  • save (947-958)
ami/ml/models/pipeline.py (3)
  • save (1115-1121)
  • save_results (809-917)
  • save_results (1107-1108)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (2)
  • Job (727-1012)
  • logger (997-1006)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (28-294)
  • cleanup_job_resources (278-294)
  • publish_task (119-150)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-97)
  • cleanup (92-97)
  • initialize_job (38-46)
ami/ml/orchestration/utils.py (1)
  • run_in_async_loop (8-18)
ami/ml/orchestration/utils.py (1)
ami/jobs/models.py (1)
  • logger (997-1006)
ami/base/views.py (1)
ami/main/api/views.py (1)
  • get (1595-1651)
ami/jobs/models.py (3)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-107)
ami/main/models.py (1)
  • SourceImage (1622-1870)
ami/ml/models/pipeline.py (2)
  • process_images (163-278)
  • process_images (1091-1105)
🪛 LanguageTool
object_model_diagram.md

[grammar] ~167-~167: Ensure spelling is correct
Context: ...ts 4. Job tracks progress through JobProgress and JobProgressStageDetail

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🪛 markdownlint-cli2 (0.18.1)
object_model_diagram.md

15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


31-31: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


38-38: Bare URL used

(MD034, no-bare-urls)


39-39: Bare URL used

(MD034, no-bare-urls)


40-40: Bare URL used

(MD034, no-bare-urls)


41-41: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


42-42: Bare URL used

(MD034, no-bare-urls)


43-43: Bare URL used

(MD034, no-bare-urls)


61-61: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


77-77: Bare URL used

(MD034, no-bare-urls)


97-97: Bare URL used

(MD034, no-bare-urls)


118-118: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


122-122: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


126-126: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)


130-130: Code block style
Expected: fenced; Actual: indented

(MD046, code-block-style)

🪛 Ruff (0.14.2)
ami/ml/orchestration/nats_queue.py

70-70: Unused method argument: ttr

(ARG002)


73-73: Avoid specifying long messages outside the exception class

(TRY003)


81-81: Do not catch blind exception: Exception

(BLE001)


94-94: Avoid specifying long messages outside the exception class

(TRY003)


103-103: Do not catch blind exception: Exception

(BLE001)


132-132: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Consider moving this statement to an else block

(TRY300)


148-148: Do not catch blind exception: Exception

(BLE001)


149-149: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Avoid specifying long messages outside the exception class

(TRY003)


206-206: Do not catch blind exception: Exception

(BLE001)


207-207: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


221-221: Avoid specifying long messages outside the exception class

(TRY003)


226-226: Consider moving this statement to an else block

(TRY300)


227-227: Do not catch blind exception: Exception

(BLE001)


228-228: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


242-242: Avoid specifying long messages outside the exception class

(TRY003)


250-250: Consider moving this statement to an else block

(TRY300)


251-251: Do not catch blind exception: Exception

(BLE001)


252-252: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


266-266: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Consider moving this statement to an else block

(TRY300)


274-274: Do not catch blind exception: Exception

(BLE001)


275-275: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/task_state.py

35-35: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


36-36: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

ami/jobs/views.py

33-43: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


73-73: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


218-218: Unused method argument: pk

(ARG002)


236-236: Avoid specifying long messages outside the exception class

(TRY003)


244-244: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


283-283: Avoid specifying long messages outside the exception class

(TRY003)


288-288: Avoid specifying long messages outside the exception class

(TRY003)


298-298: Avoid specifying long messages outside the exception class

(TRY003)


301-301: Avoid specifying long messages outside the exception class

(TRY003)


322-322: Do not catch blind exception: Exception

(BLE001)


323-323: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

45-45: Unused function argument: self

(ARG001)


120-120: Do not catch blind exception: Exception

(BLE001)


121-121: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


133-133: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


136-136: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

config/settings/base.py

28-28: Unused noqa directive (non-enabled: E231, E501)

Remove unused noqa directive

(RUF100)

ami/ml/orchestration/jobs.py

69-69: Loop control variable i not used within loop body

Rename unused i to _i

(B007)


78-78: Do not catch blind exception: Exception

(BLE001)

ami/ml/orchestration/utils.py

14-14: Do not catch blind exception: Exception

(BLE001)


15-15: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/models.py

75-75: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


430-430: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)


482-482: Unused blanket noqa directive

Remove unused noqa directive

(RUF100)

carlosgjs and others added 2 commits October 31, 2025 14:33
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
ami/ml/orchestration/nats_queue.py (1)

104-121: TTR mismatch when consumer already exists remains unaddressed.

Per the past review comment, if reserve_task() creates the consumer before publish_task(), the consumer uses the default TASK_TTR. If different TTR values are needed later, the existing consumer configuration won't be updated since JetStream doesn't allow mutating ack_wait on an existing durable consumer. This can cause premature redeliveries.

The suggested fix from the past review was to recreate the consumer when the configured ack_wait differs from the requested TTR. Consider implementing that logic or documenting that a single TTR value is always used.

🧹 Nitpick comments (5)
ami/ml/orchestration/jobs.py (2)

44-50: Avoid duplicate url() call.

image.url() is called twice: once in the condition check and again for assignment. If this method performs any computation or I/O, this is wasteful.

     for image in images:
         image_id = str(image.pk)
-        image_url = image.url() if hasattr(image, "url") and image.url() else ""
+        image_url = image.url() if hasattr(image, "url") else ""
         if not image_url:
             job.logger.warning(f"Image {image.pk} has no URL, skipping queuing to NATS for job '{job.pk}'")
             continue

68-77: Inconsistent logger usage inside async context.

Lines 70 and 76 use the module-level logger, while the rest of the function uses job.logger. This inconsistency can make log correlation difficult when debugging job-specific issues.

             for image_pk, task in tasks:
                 try:
-                    logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
+                    job.logger.info(f"Queueing image {image_pk} to stream for job '{job.pk}': {task.image_url}")
                     success = await manager.publish_task(
                         job_id=job.pk,
                         data=task,
                     )
                 except Exception as e:
-                    logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
+                    job.logger.error(f"Failed to queue image {image_pk} to stream for job '{job.pk}': {e}")
                     success = False
ami/jobs/views.py (2)

299-306: Simplify redundant list comprehension.

All items in queued_tasks already have status: "queued" (set on line 289), making the filter unnecessary.

             return Response(
                 {
                     "status": "accepted",
                     "job_id": job.pk,
-                    "results_queued": len([t for t in queued_tasks if t["status"] == "queued"]),
+                    "results_queued": len(queued_tasks),
                     "tasks": queued_tasks,
                 }
             )

310-317: Use logger.exception() to capture traceback.

logger.exception() automatically includes the traceback, which aids debugging when errors occur in production.

         except Exception as e:
-            logger.error(f"Failed to queue pipeline results for job {job.pk}: {e}")
+            logger.exception(f"Failed to queue pipeline results for job {job.pk}: {e}")
             return Response(
                 {
                     "status": "error",
                     "job_id": job.pk,
                 },
                 status=500,
             )
ami/ml/orchestration/nats_queue.py (1)

143-144: Pydantic v1 syntax used.

.dict() is Pydantic v1 syntax. For Pydantic v2 compatibility, use .model_dump(). This applies throughout the codebase where Pydantic models are serialized.

             # Convert Pydantic model to JSON
-            task_data = json.dumps(data.dict())
+            task_data = json.dumps(data.model_dump())
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2483592 and 3c034a9.

📒 Files selected for processing (4)
  • ami/jobs/tests.py (3 hunks)
  • ami/jobs/views.py (3 hunks)
  • ami/ml/orchestration/jobs.py (1 hunks)
  • ami/ml/orchestration/nats_queue.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (49-144)
ami/ml/schemas.py (1)
  • PipelineTaskResult (231-237)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • reserve_task (156-211)
🪛 Ruff (0.14.8)
ami/ml/orchestration/nats_queue.py

77-77: Avoid specifying long messages outside the exception class

(TRY003)


85-85: Do not catch blind exception: Exception

(BLE001)


98-98: Avoid specifying long messages outside the exception class

(TRY003)


107-107: Do not catch blind exception: Exception

(BLE001)


135-135: Avoid specifying long messages outside the exception class

(TRY003)


150-150: Consider moving this statement to an else block

(TRY300)


152-152: Do not catch blind exception: Exception

(BLE001)


153-153: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


168-168: Avoid specifying long messages outside the exception class

(TRY003)


209-209: Do not catch blind exception: Exception

(BLE001)


210-210: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


224-224: Avoid specifying long messages outside the exception class

(TRY003)


229-229: Consider moving this statement to an else block

(TRY300)


230-230: Do not catch blind exception: Exception

(BLE001)


231-231: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


245-245: Avoid specifying long messages outside the exception class

(TRY003)


253-253: Consider moving this statement to an else block

(TRY300)


254-254: Do not catch blind exception: Exception

(BLE001)


255-255: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


269-269: Avoid specifying long messages outside the exception class

(TRY003)


276-276: Consider moving this statement to an else block

(TRY300)


277-277: Do not catch blind exception: Exception

(BLE001)


278-278: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

75-75: Do not catch blind exception: Exception

(BLE001)

ami/jobs/views.py

255-255: Unused method argument: pk

(ARG002)


308-308: Avoid specifying long messages outside the exception class

(TRY003)


310-310: Do not catch blind exception: Exception

(BLE001)


311-311: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
ami/jobs/tests.py (1)

397-399: LGTM!

The updated assertions correctly reflect the new response structure from the result endpoint, matching the implementation in views.py.

ami/ml/orchestration/jobs.py (2)

11-25: LGTM!

The cleanup function properly returns the boolean result from the async cleanup operation, addressing the earlier feedback.


86-102: LGTM!

The post-queueing logic correctly handles both the empty tasks case (updating both stages) and properly reports success/failure based on queue outcomes.

ami/ml/orchestration/nats_queue.py (3)

181-207: LGTM!

The task reservation logic correctly creates a pull subscription, handles timeouts gracefully, and ensures cleanup via finally block.


213-232: LGTM!

The acknowledgment implementation correctly publishes the ACK message to the reply subject.


281-297: LGTM!

The cleanup logic correctly deletes the consumer before the stream and returns a combined success indicator.

ami/jobs/views.py (1)

237-252: LGTM with note on Pydantic version compatibility.

The NATS task retrieval implementation looks correct. If this project uses or will upgrade to Pydantic v2, the .dict() call (line 246) should be replaced with .model_dump() as part of migration—.dict() is deprecated in Pydantic v2. For this simple no-argument usage, the replacement is straightforward: task.model_dump().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

160-160: Use timezone-aware datetime for consistency with Django settings.

The code uses datetime.now() which returns a naive datetime. The inline comment suggests this is intentional ("Use naive datetime in local time"), but Django applications with USE_TZ=True should use timezone-aware timestamps to avoid timezone-related bugs.

Apply this diff:

 def _update_job_progress(job_id: int, stage: str, progress_percentage: float) -> None:
     from ami.jobs.models import Job, JobState  # avoid circular import
+    from django.utils import timezone

     with transaction.atomic():
         job = Job.objects.select_for_update().get(pk=job_id)
         job.progress.update_stage(
             stage,
             status=JobState.SUCCESS if progress_percentage >= 1.0 else JobState.STARTED,
             progress=progress_percentage,
         )
         if stage == "results" and progress_percentage >= 1.0:
             job.status = JobState.SUCCESS
             job.progress.summary.status = JobState.SUCCESS
-            job.finished_at = datetime.now()  # Use naive datetime in local time
+            job.finished_at = timezone.now()
         job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
         job.save()
🧹 Nitpick comments (4)
ami/jobs/tasks.py (3)

45-45: Consider narrowing the autoretry exception types.

The task uses autoretry_for=(Exception,) which will retry on any exception. This may mask distinct failure modes (e.g., validation errors vs. transient network issues) that should be handled differently. Consider specifying more targeted exceptions or using retry_for with explicit exception types.


123-124: Improve exception handling for NATS acknowledgment.

The code catches a bare Exception and logs via logging.error without the traceback. This makes debugging NATS acknowledgment failures difficult.

Apply this diff to use logging.exception for automatic traceback logging:

         except Exception as ack_error:
-            job.logger.error(f"Error acknowledging task via NATS: {ack_error}")
+            job.logger.exception(f"Error acknowledging task via NATS: {ack_error}")
             # Don't fail the task if ACK fails - data is already saved

Based on static analysis hints.


138-144: Use logging.exception for better error diagnostics.

The exception handlers at lines 139 and 142 use logging.error which doesn't include tracebacks. This makes troubleshooting production issues harder.

Apply this diff:

     except Job.DoesNotExist:
-        logger.error(f"Job {job_id} not found")
+        logger.exception(f"Job {job_id} not found")
         raise
     except Exception as e:
-        logger.error(f"Failed to process pipeline result for job {job_id}: {e}")
+        logger.exception(f"Failed to process pipeline result for job {job_id}: {e}")
         # Celery will automatically retry based on autoretry_for
         raise

Based on static analysis hints.

docker-compose.ci.yml (1)

43-55: Consider renaming container for CI consistency.

The NATS container is named ami_local_nats in a CI compose file. This could cause conflicts if both local and CI stacks run simultaneously, and doesn't follow the naming pattern of other CI resources (e.g., ami_ci_postgres_data).

Consider changing to ami_ci_nats:

   nats:
     image: nats:2.10-alpine
-    container_name: ami_local_nats
+    container_name: ami_ci_nats
     hostname: nats

Alternatively, remove the container_name directive entirely and let docker-compose auto-generate it with the stack name prefix.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c034a9 and e9d2a1c.

📒 Files selected for processing (5)
  • .envs/.ci/.django (1 hunks)
  • .envs/.production/.django-example (1 hunks)
  • ami/jobs/tasks.py (3 hunks)
  • ami/jobs/tests.py (3 hunks)
  • docker-compose.ci.yml (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .envs/.ci/.django
🧰 Additional context used
🧬 Code graph analysis (2)
ami/jobs/tasks.py (4)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • acknowledge_task (213-232)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/ml/schemas.py (2)
  • PipelineResultsResponse (199-214)
  • summary (179-196)
ami/jobs/models.py (4)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • save (954-965)
ami/jobs/tests.py (2)
ami/ml/orchestration/jobs.py (1)
  • queue_images_to_nats (28-102)
ami/main/models.py (1)
  • SourceImage (1666-1914)
🪛 Ruff (0.14.8)
ami/jobs/tasks.py

123-123: Do not catch blind exception: Exception

(BLE001)


124-124: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


135-135: Abstract raise to an inner function

(TRY301)


139-139: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


142-142: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
ami/jobs/tasks.py (2)

203-225: LGTM!

The log_time helper provides a clean, functional approach to timing code blocks. The use of time.perf_counter() is appropriate for performance measurements, and the docstring clearly explains the usage pattern.


1-18: LGTM!

The imports are well-organized and appropriate for the new functionality. The use of local imports inside functions (e.g., from ami.jobs.models import Job) to avoid circular dependencies is a good practice.

.envs/.production/.django-example (1)

69-71: LGTM!

The NATS URL configuration is consistent with the NATS service added to docker-compose and matches the pattern used in local and CI environments.

ami/jobs/tests.py (2)

330-338: LGTM!

The test now correctly creates and saves SourceImage instances before passing them to queue_images_to_nats. This ensures each image has a valid primary key, addressing the previous concern about unsaved objects with pk=None.


403-405: LGTM!

The updated assertions correctly reflect the new asynchronous result processing workflow where results are queued to Celery tasks rather than processed synchronously.

docker-compose.ci.yml (2)

25-25: LGTM!

Adding NATS to Django's dependencies is correct, as the new task queue and job processing functionality requires NATS JetStream to be available.


50-50: JetStream and monitoring flags are correctly configured.

The NATS service configuration uses -js to enable JetStream and -m 8222 to start the HTTP monitoring server on the conventional port. Both flags are supported in NATS 2.10 and this combination is the standard way to enable these features.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (9)
docker-compose.ci.yml (1)

19-25: Consider adding explicit health condition to depends_on for NATS.

The django service now depends on nats, but the dependency lacks condition: service_healthy, unlike the minio-init service (line 84). This means Django may start before NATS is fully initialized and healthy, potentially causing early connection failures.

If the application already has robust retry/backoff logic for NATS connections, this is acceptable. Otherwise, consider updating to ensure NATS is healthy:

     depends_on:
       - postgres
       - redis
       - minio-init
       - ml_backend
       - rabbitmq
-      - nats
+      nats:
+        condition: service_healthy
ami/ml/tests.py (3)

860-884: Solid coverage; consider relying more on the public API than _get_progress

This suite gives good coverage of initialization, per-stage tracking, empty jobs, and cleanup. The one downside is the heavy reliance on the private _get_progress helper, which couples tests to implementation details and may make refactors noisier. Where feasible, prefer going through update_state (including for test_progress_tracking and test_stages_independent) and reserve _get_progress calls for very targeted cases where you intentionally want to test that internal behavior.


895-918: Float equality assertions could use assertAlmostEqual for robustness

test_progress_tracking asserts exact equality on floating-point percentages (0.4, 0.8, 1.0). This works today but is a bit brittle to minor implementation changes (e.g., if total is computed differently). Using assertAlmostEqual(progress.percentage, 0.4, places=6) (and similarly for other percentages) would make these tests more resilient while preserving intent.


920-944: Locking behavior is covered; optional extra check for lock lifecycle

test_update_state_with_locking nicely validates that a held lock causes update_state to no-op and that progress resumes after the lock is cleared. If you want to harden this further, you could also assert that update_state does not inadvertently modify the lock when it’s held by another task (e.g., confirm the lock key/value is unchanged after the failed update) to guard against future changes to the lock-release logic.

ami/ml/orchestration/test_nats_queue.py (5)

10-38: Good structure and mocking helpers; small clarity tweaks possible

The use of IsolatedAsyncioTestCase plus _create_sample_task and _create_mock_nats_connection keeps the async tests readable and DRY. One minor improvement would be to document in _create_mock_nats_connection which subset of NATS/JetStream APIs you rely on (e.g., in a comment) so future changes to TaskQueueManager know what needs to be mocked or extended here.


39-49: Context manager lifecycle test is solid; optionally assert __aenter__/__aexit__ effects more strictly

The lifecycle test correctly verifies that a connection is established and closed. If you want this to catch more regressions, you could also assert that manager.nc/manager.js are None after the context exits, and optionally use nc.close.assert_awaited_once() to emphasize the async nature of the close call.


50-64: Publish-path test works but could assert arguments more precisely

Right now the test checks add_stream indirectly via a substring search on call_args. To make this more robust, consider asserting on the actual kwargs used (e.g., stream name and subjects) instead of relying on str(js.add_stream.call_args), which is more brittle to representation changes of mocks.


106-117: ACK behavior test is clear; consider also covering failure/no-connection case

The ACK test correctly ensures the manager publishes b"+ACK" to the supplied reply subject. As a follow-up, you might also add a negative test mirroring test_operations_without_connection_raise_error to ensure acknowledge_task also raises when no connection is open, keeping behavior consistent across APIs.


118-129: Cleanup test validates both consumer and stream deletion

test_cleanup_job_resources confirms that both delete_consumer and delete_stream are invoked and that the method returns True, which is the key behavior here. If you later add error handling (e.g., ignore “not found”), consider extending this test to cover that branch as well.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e9d2a1c and 3d198d0.

📒 Files selected for processing (3)
  • ami/ml/orchestration/test_nats_queue.py (1 hunks)
  • ami/ml/tests.py (1 hunks)
  • docker-compose.ci.yml (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ami/ml/tests.py (1)
ami/ml/orchestration/task_state.py (4)
  • TaskStateManager (17-125)
  • initialize_job (39-49)
  • _get_progress (85-117)
  • update_state (54-83)
ami/ml/orchestration/test_nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (7)
docker-compose.ci.yml (2)

43-52: NATS service configuration is well-suited for CI testing.

The NATS service is properly configured with:

  • Pinned image version (nats:2.10-alpine) for reproducibility
  • Correct hostname (nats) matching the NATS_URL in environment config
  • JetStream enabled (-js flag) for the task queue feature
  • Monitoring port exposed (-m 8222) with appropriate healthcheck
  • Reasonable healthcheck parameters (10s interval, 5s timeout, 3 retries)

73-73: Minio healthcheck formatting improvement.

The reformatting from single-quoted string to explicit array syntax improves consistency with other healthchecks in the file (NATS, etc.) and aligns with docker-compose best practices for the healthcheck test directive.

ami/ml/tests.py (1)

960-980: Empty-job and cleanup behavior look correct

The expectations for empty jobs (immediately 100% complete) and for cleanup returning None progress afterwards are well aligned with TaskStateManager’s semantics. No changes needed here.

ami/ml/orchestration/test_nats_queue.py (4)

65-89: Reservation success path is well covered

The reserve_task happy path is nicely exercised: deserialization, reply_subject propagation, and unsubscribe are all asserted. This aligns well with the intended PipelineProcessingTask contract; no changes needed here.


90-105: No-message reservation behavior looks correct

test_reserve_task_no_messages appropriately verifies the None return and that the pull subscription is still cleaned up via unsubscribe(). That matches the desired contract for idle queues.


130-137: Naming convention tests nicely lock down per-job resource names

The expectations for stream (job_123), subject (job.123.tasks), and consumer (job-123-consumer) names are explicit and make future refactors safer. This is a good use of tests against private helpers.


138-150: Error-on-missing-connection behavior is correctly asserted

test_operations_without_connection_raise_error ensures that key operations fail fast with a clear RuntimeError when the manager isn’t entered. That’s important guardrail behavior for callers and looks correct as-is.

@carlosgjs
Copy link
Collaborator Author

@mihow I'm not addressing two of coderabbit comments:

  • Use of logger.exception() instead of logger.error() since I don't see it in the rest of the code base and its probably better done as a global change.
  • Use of timezone.now() instead of datetime.now() to update the Job's finished_at because the rest of the codebase uses datetime, I believe because we have django set to USE_TZ=False

@carlosgjs carlosgjs requested a review from mihow December 10, 2025 20:35
@netlify
Copy link

netlify bot commented Dec 19, 2025

Deploy Preview for antenna-ssec ready!

Name Link
🔨 Latest commit 936d768
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/696a92e1e6dee80008379bb3
😎 Deploy Preview https://deploy-preview-987--antenna-ssec.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
docker-compose.staging.yml (2)

20-23: Fix indentation for the commented NATS dependency.

The commented # - nats has extra indentation compared to the other list items. While YAML allows comments at any indentation, this is inconsistent and could be confusing.

🔎 Proposed fix
     depends_on:
       - postgres
       - redis
-        # - nats  
+      # - nats

69-82: Consider JetStream persistence and container naming.

Two observations:

  1. No JetStream persistence: Without a volume mount for JetStream storage, streams/consumers and any pending messages will be lost on container restart. This may be acceptable for staging/testing, but consider adding persistence if jobs shouldn't be lost across restarts:

    volumes:
      - nats_data:/data
    command: ["-js", "-m", "8222", "-sd", "/data"]
  2. Container name inconsistency: ami_local_nats uses "local" prefix but this is the staging compose file. Consider ami_staging_nats for consistency.

Please confirm whether JetStream persistence is intentionally omitted for staging, or if it should be added to preserve task queues across container restarts.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d198d0 and 3a73329.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py (3 hunks)
  • docker-compose.staging.yml (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
ami/jobs/tasks.py (5)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (32-297)
  • acknowledge_task (213-232)
ami/ml/orchestration/task_state.py (2)
  • TaskStateManager (17-125)
  • update_state (54-83)
ami/ml/schemas.py (2)
  • PipelineResultsResponse (199-214)
  • summary (179-196)
ami/jobs/models.py (5)
  • Job (734-1019)
  • logger (1004-1013)
  • retry (873-885)
  • JobState (27-63)
  • save (954-965)
ami/ml/models/pipeline.py (3)
  • save_results (814-922)
  • save_results (1120-1121)
  • save (1128-1134)
🪛 Ruff (0.14.8)
ami/jobs/tasks.py

98-98: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


124-124: Abstract raise to an inner function

(TRY301)


127-127: Do not catch blind exception: Exception

(BLE001)


128-128: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


144-144: Do not catch blind exception: Exception

(BLE001)


145-145: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (4)
docker-compose.staging.yml (1)

32-50: LGTM!

Adding restart: always to the core services (django, postgres, redis) improves reliability for the staging environment.

ami/jobs/tasks.py (3)

131-147: LGTM - Error handling strategy is appropriate.

The helper correctly wraps the async NATS acknowledgment and handles failures gracefully without failing the parent task. The comment on line 146 clearly explains the design decision: if data is already saved, it's better to log ACK failures and let NATS redeliver than to fail the entire task.


205-227: LGTM - Clean timing utility.

The implementation is straightforward and correct. Using time.perf_counter() is appropriate for measuring elapsed time, and the partial function pattern allows convenient chaining of timing measurements throughout the task execution.


149-165: Verified: USE_TZ=False is configured—no changes needed.

The project's Django settings explicitly set USE_TZ = False with a comment noting "All time are in local time." With this configuration, datetime.datetime.now() on line 162 is the correct approach and is used consistently throughout the codebase (20+ instances). The explicit comment in the code confirming "Use naive datetime in local time" aligns with the project's timezone strategy.

@carlosgjs carlosgjs changed the title [integration] Processing service V2 Processing service V2 - Phase 1 Jan 6, 2026
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @docker-compose.staging.yml:
- Around line 77-81: Healthcheck uses wget which isn't present in the
nats:2.10-alpine image causing false unhealthy status; update the healthcheck
"test" in the healthcheck block to use a simple shell TCP probe (via sh and
/dev/tcp) or use the CMD-SHELL form to attempt a TCP connection to
localhost:8222/healthz and return success/failure, ensuring it works with the
minimal sh-only Alpine runtime.
🧹 Nitpick comments (1)
docker-compose.staging.yml (1)

20-23: Consider uncommenting the NATS dependency or adding a note.

The nats dependency is commented out, meaning Django and Celery workers could start before NATS is ready. When the async_pipeline_workers feature flag is enabled for a project, this could cause transient connection failures during startup. If the application code includes retry logic for NATS connections, this is acceptable; otherwise, consider uncommenting the dependency.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a73329 and e241586.

📒 Files selected for processing (1)
  • docker-compose.staging.yml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (2)
docker-compose.staging.yml (2)

32-32: LGTM!

Adding restart: always to core services is appropriate for staging environments and ensures resilience against transient failures.

Also applies to: 46-46, 50-50


69-76: NATS service configuration looks good.

The JetStream setup with -js flag, appropriate port mappings, and hostname configuration are correct for the PR's task queue requirements.

Also applies to: 82-82

@mihow
Copy link
Collaborator

mihow commented Jan 14, 2026

@mihow review the errors we are getting when calling TaxaList.get_or_create() (TaxaList already exists)

task queuing with acknowledgment support via reply subjects. This allows workers
to pull tasks over HTTP and acknowledge them later without maintaining a persistent
connection to NATS.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihow - added this comment on the selection of NATS over Rabbit and Beanstalk.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
ami/jobs/views.py (1)

226-250: Add maximum batch size constraint to prevent resource exhaustion.

The batch parameter has no upper bound, allowing users to request arbitrarily large numbers of tasks. A single request with a high batch value ties up the request thread (via async_to_sync), and multiple concurrent requests could exhaust thread resources. Add a configurable max cap.

🤖 Fix all issues with AI agents
In `@ami/jobs/views.py`:
- Around line 270-304: The current code validates and enqueues each item as it
iterates over results (using PipelineTaskResult and
process_pipeline_result.delay), which can dispatch partial work if a later item
fails; change the flow to first validate all items without calling
process_pipeline_result.delay (e.g., iterate results and construct/validate
PipelineTaskResult objects into a temporary list), and only after every item
validates successfully iterate that temporary list to call
process_pipeline_result.delay and populate queued_tasks for the response (keep
using job.pk, reply_subject and result_data.dict() when enqueuing).

In `@ami/ml/orchestration/jobs.py`:
- Around line 41-92: When building tasks, track skipped images (e.g., add a
skipped_count variable incremented in the "if not image_url" branch) and treat
them as failures: after queue_all_images runs, add skipped_count to
failed_queues (or if tasks is empty, mark job as FAILED when skipped_count > 0
instead of SUCCESS). Update logic around TaskStateManager initialization and the
"if tasks:" / else branch so totals (successful_queues, failed_queues) include
skipped_count and job.progress/state reflect that skipped images count as
failures; reference the PipelineProcessingTask creation loop, the skipped image
logger call, the queue_all_images async function, and the final
async_to_sync(queue_all_images)() call to locate where to apply these changes.
♻️ Duplicate comments (1)
ami/jobs/tasks.py (1)

102-130: Post-ACK failures can leave progress stale—ACK last or retry.

ACK is sent before results-stage updates; if update_state / _update_job_progress fails after ACK, NATS won’t redeliver and progress may stay behind. Consider moving ACK after results-stage updates (or explicitly retry on post-ACK failures).

🐛 Proposed fix
-        _ack_task_via_nats(reply_subject, job.logger)
-        # Update job stage with calculated progress
-        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
+        # Update job stage with calculated progress
+        progress_info = state_manager.update_state(processed_image_ids, stage="results", request_id=self.request.id)
@@
-        _update_job_progress(job_id, "results", progress_info.percentage)
+        _update_job_progress(job_id, "results", progress_info.percentage)
+        _ack_task_via_nats(reply_subject, job.logger)
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e241586 and 1202063.

📒 Files selected for processing (4)
  • ami/jobs/tasks.py
  • ami/jobs/views.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py
🧰 Additional context used
🧬 Code graph analysis (3)
ami/ml/orchestration/nats_queue.py (1)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
ami/jobs/views.py (3)
ami/jobs/tasks.py (1)
  • process_pipeline_result (47-130)
ami/ml/schemas.py (1)
  • PipelineTaskResult (231-237)
ami/ml/orchestration/nats_queue.py (2)
  • TaskQueueManager (35-300)
  • reserve_task (159-214)
ami/ml/orchestration/jobs.py (4)
ami/jobs/models.py (4)
  • Job (734-1019)
  • JobState (27-63)
  • logger (1004-1013)
  • save (954-965)
ami/ml/orchestration/nats_queue.py (3)
  • TaskQueueManager (35-300)
  • cleanup_job_resources (284-300)
  • publish_task (126-157)
ami/ml/orchestration/task_state.py (3)
  • TaskStateManager (17-125)
  • cleanup (119-125)
  • initialize_job (39-49)
ami/ml/schemas.py (1)
  • PipelineProcessingTask (217-228)
🪛 Ruff (0.14.11)
ami/ml/orchestration/nats_queue.py

80-80: Avoid specifying long messages outside the exception class

(TRY003)


88-88: Do not catch blind exception: Exception

(BLE001)


101-101: Avoid specifying long messages outside the exception class

(TRY003)


110-110: Do not catch blind exception: Exception

(BLE001)


138-138: Avoid specifying long messages outside the exception class

(TRY003)


153-153: Consider moving this statement to an else block

(TRY300)


155-155: Do not catch blind exception: Exception

(BLE001)


156-156: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


171-171: Avoid specifying long messages outside the exception class

(TRY003)


212-212: Do not catch blind exception: Exception

(BLE001)


213-213: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


227-227: Avoid specifying long messages outside the exception class

(TRY003)


232-232: Consider moving this statement to an else block

(TRY300)


233-233: Do not catch blind exception: Exception

(BLE001)


234-234: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


248-248: Avoid specifying long messages outside the exception class

(TRY003)


256-256: Consider moving this statement to an else block

(TRY300)


257-257: Do not catch blind exception: Exception

(BLE001)


258-258: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


272-272: Avoid specifying long messages outside the exception class

(TRY003)


279-279: Consider moving this statement to an else block

(TRY300)


280-280: Do not catch blind exception: Exception

(BLE001)


281-281: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/views.py

253-253: Unused method argument: pk

(ARG002)


306-306: Avoid specifying long messages outside the exception class

(TRY003)


308-308: Do not catch blind exception: Exception

(BLE001)


309-309: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/jobs/tasks.py

98-98: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


124-124: Abstract raise to an inner function

(TRY301)


127-127: Do not catch blind exception: Exception

(BLE001)


128-130: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


146-146: Do not catch blind exception: Exception

(BLE001)


147-147: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

ami/ml/orchestration/jobs.py

75-75: Do not catch blind exception: Exception

(BLE001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Redirect rules
  • GitHub Check: Header rules
  • GitHub Check: Pages changed
  • GitHub Check: test
🔇 Additional comments (8)
ami/ml/orchestration/jobs.py (1)

10-25: LGTM: cleanup helper returns success status.

Returning the async cleanup result allows callers to react to failures.

ami/ml/orchestration/nats_queue.py (4)

26-63: LGTM: connection lifecycle is cleaned up.

Context manager setup/teardown is clear and ensures the client is closed.


77-125: LGTM: per-job stream/consumer provisioning is clear.


126-214: LGTM: publish/reserve flow and reply_subject mapping.


216-300: LGTM: ACK and cleanup helpers are straightforward.

ami/jobs/tasks.py (3)

133-148: LGTM: ACK helper encapsulates NATS call cleanly.


151-166: LGTM: atomic progress update under row lock.


207-229: LGTM: lightweight timing helper is clear.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PSv2 Job scheduler Reference PSv2 implementation PSv2 Pull API design and specification

3 participants